package com.carol.bigdata.utils

import com.alibaba.fastjson.{JSON, JSONObject}
import com.carol.bigdata.utils.HBaseUtil
import com.carol.bigdata.utils.FuncUtil
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.JavaConverters._ //隐式转换，这样才能有asScala, asJava这个方法


object FeatureUtil {

    def parseProperties(raw: List[String],
                        propertiesIdx: Int,
                        columns: List[String],
                        transIndices: List[Int] = List()): List[String] = {
        val emptyMap = Map[String, String]()
        var out = List[String]()
        var appendList = List[String]()
        for (i <- raw.indices) {
            if (i != propertiesIdx) out :+= raw(i)
            else out ++= {
                val target = if (raw(i) != null && raw(i) != "null") raw(i) else JSON.toJSON(emptyMap.asJava).toString
                val map: JSONObject = JSON.parseObject(target)
                // properties字段不需要改变顺序的处理
                if (transIndices.isEmpty)
                    for (col <- columns) yield map.getOrDefault(col, "null").asInstanceOf[String]
                // properties字段需要改变顺序的处理（目前只支持将选定字段移到最后）
                else {
                    for (i <- transIndices) appendList :+= map.getOrDefault(columns(i), "null").asInstanceOf[String]
                    for (i <- columns.indices if !transIndices.contains(i))
                        yield map.getOrDefault(columns(i), "null").asInstanceOf[String]
                }
            }
        }
        out ++ appendList
    }


    def calFeat(featValues: List[List[String]],
                featTypes: List[(String, String)]): List[String] = {
        var res = List[String]()
        for (i <- featValues.indices) {
            val valueList = featValues(i)
            val (key, valueType) = featTypes(i)
            // 数值类型的指标处理
            if (valueType.toLowerCase == "int")
                res :+= valueList.map(v => if (v != null && v.toLowerCase != "null") v.toInt else 0).sum.toString
            // 分布类型的指标处理
            else {
                val valueSet = valueList.toSet
                var valueMaps = Map[String, Int]()
                if (valueType.toLowerCase == "map")
                    for (value <- valueSet) valueMaps ++= Map(value -> valueList.count(_ == value))
                else
                    for (value <- valueSet) {
                        // 需要split的分布类型的指标处理
                        val tmpKey = value.split(":").head
                        valueMaps ++= Map(tmpKey -> {
                            val splitList = valueList.map(_.split(":"))
                              .filter(v => v.head == tmpKey && v.last != null && v.last != "null")
                            splitList.map(_.last.toInt).sum
                        })
                    }
                res :+= JSON.toJSON(valueMaps.asJava).toString
            }
        }
        res
    }

    def calTotal(hbaseParams: Map[String, String],
                 spark: SparkSession,
                 statDay: String,
                 rdd: RDD[(List[String], List[String])],
                 table: String,
                 cfList: List[String],
                 keyColumnList: List[String],
                 valueColumnTypes: List[(String, String)],
                 yesterday: List[String]): List[RDD[(List[String], List[String])]] = {
        val intColumnList: List[String] = valueColumnTypes.filter(_._2.toLowerCase == "int").map(_._1)
        val mapColumnList: List[String] = valueColumnTypes.filter(_._2.toLowerCase.contains("map")).map(_._1)
        val columns: List[String] = valueColumnTypes.map(_._1)
        val intIndices: List[Int] = for (i <- intColumnList) yield columns.indexOf(i)
        val mapIndices: List[Int] = for (i <- mapColumnList) yield columns.indexOf(i)

        // 读取昨天累计tag_total
        val totalRDD: RDD[(List[String], (List[Int], List[Map[String, Int]]))] =
            HBaseUtil.readIntMapRDD(hbaseParams, spark, table, cfList, yesterday, keyColumnList,
                intColumnList.map(_ + "_total"), mapColumnList.map(_ + "_total"))


        val totalIntRDD: RDD[(List[String], List[Int])] = totalRDD.map(x => (x._1.tail, x._2._1))
        val totalMapRDD: RDD[(List[String], List[Map[String, Int]])] = totalRDD.map(x => (x._1.tail, x._2._2))

        val todayIntRDD: RDD[(List[String], List[Int])] = rdd
          .map(x => {
              val IntValues = for (i <- intIndices) yield x._2(i)
              (x._1.tail, IntValues.map(_.toInt))
          })


        val todayMapRDD: RDD[(List[String], List[Map[String, Int]])] = rdd
          .map(x => {
              val mapValues = for (i <- mapIndices) yield x._2(i)
              (x._1.tail, mapValues.map(y => FuncUtil.jsonObj2MapInt(JSON.parseObject(y))))
          })

        // int类型合并
        val todayTotalIntRDD = totalIntRDD.union(todayIntRDD)
          .reduceByKey((a, b) => a.zip(b).map(x => x._1 + x._2))
          .map(x => (statDay +: x._1, x._2.map(_.toString)))
          .filter(_._2.nonEmpty)

        // map类型合并
        val todayTotalMapRDD: RDD[(List[String], List[String])] = totalMapRDD.union(todayMapRDD)
          .reduceByKey((a, b) => a.zip(b).map(x => FuncUtil.mapAddByKey(x._1, x._2)))
          .map(x => (statDay +: x._1, x._2.map(y => JSON.toJSON(y.asJava).toString)))
          .filter(_._2.nonEmpty)

        List(todayTotalIntRDD, todayTotalMapRDD)
    }

    def getDataRowKey(gameId: String, event: String, statDay: String = null): String = {
        val rowKey = {
            if (statDay != null) s"${gameId}_${event}_${statDay}"
            else s"${gameId}_${event}"
        }
        rowKey
    }

    // 统一成(List(game_id,time,uid),x._2)
    def changeProfileRDD(rdd: RDD[(List[String], List[String])]): RDD[(List[String], List[String])] = {
        rdd.map(x => (List(x._1(1)) :+  x._1.head :+ x._1.last, x._2))
    }

}
